
#pragma once

#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <string>
#include <functional>
#include <string.h>
#include <assert.h>
#include <unordered_map>
#include <vector>
#include <thread>
#include <mutex>
#include <memory>
#include <condition_variable>

#pragma once
// 日志分为三个等级：调试 正常 错误
#define DEBUG 0
#define NORMAL 1
#define ERROR 2

std::string level_to_string(int level)
{
    if (level == DEBUG)
        return "DEBUG";
    if (level == NORMAL)
        return "NORMAL";
    return "ERROR";
}

#define DEFAULT_LOG_LEVEL ERROR // 修改这里，就可以控制有效日志的打印
// 日志的输出格式为：[年-月-日 时:分:秒][日志等级][文件与行号]：日志内容
#define LOG(level, format, ...)                                                                                                                      \
    {                                                                                                                                                \
        if (level <= DEFAULT_LOG_LEVEL)                                                                                                              \
        {                                                                                                                                            \
            time_t timep = time(nullptr);                                                                                                            \
            struct tm *ptm = localtime(&timep);                                                                                                      \
            char time_buffer[48] = {0};                                                                                                              \
            sprintf(time_buffer, "[%d-%d-%d %d:%d:%d]", ptm->tm_year + 1900, ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec); \
            char log_buffer[48] = {0};                                                                                                               \
            sprintf(log_buffer, "[%s]", level_to_string(level).c_str());                                                                             \
            fprintf(stdout, "[thread_id:%p]%s%s[%s:%d]:" format "\n", (void *)pthread_self(),time_buffer, log_buffer, __FILE__, __LINE__, ##__VA_ARGS__);                                 \
        }                                                                                                                                            \
    }

#define DEBUG_LOG(format, ...) LOG(DEBUG, format, ##__VA_ARGS__)
#define NORMAL_LOG(format, ...) LOG(NORMAL, format, ##__VA_ARGS__)
#define ERROR_LOG(format, ...) LOG(ERROR, format, ##__VA_ARGS__)

const static int defualt_backlog = 64; // 默认全连接队列大小
class Socket
{
public:
    Socket(int sockfd = -1)
        : _sockfd(sockfd)
    {
    }

    ~Socket()
    {
        Close();
    }

    int Create(bool IsReuseAddr = false)
    {
        _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd == -1)
        {
            ERROR_LOG("socket error!");
            abort();
        }

        if (IsReuseAddr)
        {
            int val = 1;
            setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
        }
        return _sockfd; // 让调用方判断socket是否调用成功
    }

    int Bind(uint16_t port, const std::string &ip = "0.0.0.0")
    {
        sockaddr_in local;
        std::memset(&local, 0, sizeof(local));
        local.sin_addr.s_addr = inet_addr(ip.c_str());
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        int n = bind(_sockfd, (sockaddr *)&local, sizeof(local));
        return n; // 让调用方判断bind是否调用成功
    }

    int Listen(int backlog = defualt_backlog)
    {
        int n = listen(_sockfd, backlog);
        return n; // 让调用方判断listen是否调用成功
    }

    int Accept(sockaddr_in *out = nullptr, socklen_t *len = nullptr)
    {
        int connfd = accept(_sockfd, (sockaddr *)out, len);
        return connfd; // 让调用方判断accept是否调用成功
    }

    int Connect(uint16_t port, const std::string &ip)
    {
        sockaddr_in local;
        std::memset(&local, 0, sizeof(local));
        int connfd = connect(_sockfd, (sockaddr *)&local, sizeof(local));
        return connfd; // 让调用方判断connfd是否成功
    }

    int Recv(void *buf, size_t size, int flag = 0) // 默认阻塞
    {
        int n = recv(_sockfd, buf, size, flag);
        if (n <= 0)
        {
            if (errno == EAGAIN || errno == EINTR)
                return 0;
            ERROR_LOG("recv error: %s",strerror(errno));
            return -1;
        }
        return n;
    }

    ssize_t Send(const void *buf, size_t size, int flag = 0)
    {
        ssize_t n = send(_sockfd, buf, size, flag);
        if (n <= 0)
        {
            if (errno == EAGAIN || errno == EINTR)
                return 0;
            ERROR_LOG("send error!");
            return -1;
        }
        return n;
    }

    void Close()
    {
        close(_sockfd);
    }

    int GetFd() { return _sockfd; }

private:
    int _sockfd;
};

const static int default_buffer_size = 1024;
class Buffer
{
public:
    Buffer()
        : _reader(0), _writer(0), _buffer(default_buffer_size)
    {}

    char *Begin() { return &(*_buffer.begin()); }
    char *ReaderPosition()  { return &(*_buffer.begin()) + _reader; }              // 返回_reader指针指向的位置
    char *WriterPosition()  { return &(*_buffer.begin()) + _writer; }              // 返回_writer指针指向的位置
    int64_t ReadableSpaceSize()  { return _writer - _reader; }                     // 返回可读空间大小
    int64_t TailFreeSpaceSize() { return _buffer.size() - _writer; }              // 返回writer指针之后的可写空间
    int64_t HeadFreeSpaceSize() { return _reader; }                               // 返回reader指针之前的可写空间
    int64_t FreeSpaceSize() { return TailFreeSpaceSize() + HeadFreeSpaceSize(); } // 返回总的可写空间大小
    void WriterOffset(int64_t len) { _writer += len; }
    void ReaderOffset(int64_t len) { _reader += len; }
    void EnsureHaveWriteSpace(int64_t len) // 确保拥有写的空间
    {
        if (len <= TailFreeSpaceSize())
            return;                 // 如果writer之后的空间足够写入新数据
        if (len <= FreeSpaceSize()) // 如果writer之后的空间不够，但是总的空间够
        {
            std::copy(ReaderPosition(), WriterPosition(), Begin());
            return;
        }
        // 到这里就是writer之后、所有空间都不够存放新数据，就要扩容
        _buffer.resize(_buffer.size() + len);
    }
    void Write(void *data, int64_t len, bool isOffset = true)
    {
        EnsureHaveWriteSpace(len); // 首先确保有足够的空间可以写
        std::copy((const char *)data, (const char *)data + len, WriterPosition());
        if (isOffset)
            WriterOffset(len); // 默认情况下，指针都会偏移
    }

    void Write(const std::string &str, bool isOffset = true)
    {
        EnsureHaveWriteSpace(str.size()); // 首先确保有足够的空间可以写
        std::copy(&str[0], &str[0] + str.size(), WriterPosition());
        if (isOffset)
            WriterOffset(str.size()); // 默认情况下，指针都会偏移
    }

    void Write(Buffer &buf, bool isOffset = true)
    {
        EnsureHaveWriteSpace(buf.ReadableSpaceSize()); // 首先确保有足够的空间可以写
        std::copy(buf.ReaderPosition(), buf.ReaderPosition() + buf.ReadableSpaceSize(),WriterPosition());
        if (isOffset)
            WriterOffset(buf.ReadableSpaceSize()); // 默认情况下，指针都会偏移
    }

    void Read(void *data, int64_t len, bool isOffset = true)
    {
        if (len > ReadableSpaceSize())
            return; // 要读取的数据超过缓冲区可读范围
        std::copy(ReaderPosition(), ReaderPosition() + ReadableSpaceSize(), (char *)data);
        if (isOffset)
            ReaderOffset(len); // 默认情况下，指针都会偏移
    }

    void Read(std::string &str, bool isOffset = true)
    {
        if (str.size() > ReadableSpaceSize())
            return; // 要读取的数据超过缓冲区可读范围
        str.resize(ReadableSpaceSize());
        std::copy(ReaderPosition(), ReaderPosition() + ReadableSpaceSize(), &str[0]);
        if (isOffset)
            ReaderOffset(str.size()); // 默认情况下，指针都会偏移
    }

private:
    std::vector<char> _buffer; // vector容器模拟缓冲区；不用string类，因为stirng类要处理'\0'
    int64_t _reader;
    int64_t _writer;
};

class EventLoop;
using CallBack = std::function<void()>;
class Channel
{
public:
    Channel(EventLoop *pl, int fd) : _pl(pl), _fd(fd),_events(0) {}

    int GetFd() { return _fd; } // 获取文件描述符

    bool IsRead() { return _revents & EPOLLIN; }   // 描述符是否可读
    bool IsWrite() { return _revents & EPOLLOUT; } // 描述符是否可写

    void SetReadCallBack(CallBack cb) { _read_call_back = cb; }
    void SetWriteCallBack(CallBack cb) { _write_call_back = cb; }
    void SetAbnormalCallBack(CallBack cb) { _abnormal_call_back = cb; }
    void SetAnyCallBack(CallBack cb) { _any_call_back = cb; }
    void SetBrokenCallBack(CallBack cb) { _broken_call_back = cb; }

    //启动读事件监控
    void EnableRead() { _events |= EPOLLIN; Update(); }
    //启动写事件监控
    void EnableWrite() { _events |= EPOLLOUT; Update(); }
    void Update();
    void Remove();
    /*----------------因为Channel、Poller两个模块互相使用--------------*/
    void CancelReadMonitor();
    void CancelWriteMonitor();
    void CancelAbnormalMonitor();
    void CancelBrokenMonitor();
    void CancelAllMonitor();

    void SetCurrentEvents(int32_t events);
    /*---------------这里的实现放在了Poller模块之后---------------*/
    void SetTriggerEvents(int32_t revents) { _revents = revents; }

    uint32_t GetCurrentEvnts() { return _events; }

    void CallBackBaseOnEvents() // 根据触发事件的类型选择不同的回调函数
    {
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI) )
        {
            if (_any_call_back) _any_call_back(); // 主要是为了刷新定时器
            if (_read_call_back) _read_call_back();
        }
        if (_revents & EPOLLOUT)
        {
            if (_any_call_back) _any_call_back(); // 主要是为了刷新定时器
            if (_write_call_back) _write_call_back();
        }
        else if (_revents & EPOLLERR) // 套接字依然可读可写
        {
            if (_abnormal_call_back) _abnormal_call_back();
        }
        else if (_revents & EPOLLHUP) // 连接关闭，套接字不可读不可写，放在最后
        {
            if (_broken_call_back) _broken_call_back();
        }
        //if (_any_call_back) _any_call_back(); // 主要是为了刷新定时器
    }

private:
    int32_t _events;              // 当前事件
    int32_t _revents;             // 触发的事件
    int _fd;                      // 监视的文件描述符
    CallBack _read_call_back;     // 读事件回调
    CallBack _write_call_back;    // 写事件回调
    CallBack _abnormal_call_back; // 异常事件回调
    CallBack _any_call_back;      // 任意事件回调
    CallBack _broken_call_back;   // 连接断开回调

    EventLoop *_pl;
};

const static int default_events_num = 1024;
class Poller
{
public:
    Poller()
    {
        _epfd = epoll_create(88);
        if (_epfd == -1)
        {
            ERROR_LOG("epoll_create error!");
            abort();
        }
    }

private:
    bool IsHaveChannel(Channel *pc) // 是否已经存在了Channel对象？
    {
        auto it = _channels.find(pc->GetFd());
        return it != _channels.end();
    }

public:
    void UpdateEvent(Channel *pc) // 为某个Channel对象添加事件
    {
        int fd = pc->GetFd();
        epoll_event ev;
        ev.data.fd = fd;
        ev.events = pc->GetCurrentEvnts();

        bool ret = IsHaveChannel(pc);
        if (ret)
        {
            int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev); // 如果已经存在，则是修改
            if (n == -1)
            {
                ERROR_LOG("epoll_ctl error: %s", strerror(errno));
                abort();
            }
        }
        else
        {
            int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev); // 如果没有存在，则是添加
            if (n == -1)
            {
                ERROR_LOG("epoll_ctl error: %s", strerror(errno));
                abort();
            }
            _channels.insert(std::make_pair(pc->GetFd(), pc));
        }
    }

    void RemoveEvent(Channel *pc) // 取消对某个Channel的监控
    {
        bool ret = IsHaveChannel(pc);
        if (ret == false)
            return;

        int fd = pc->GetFd();
        epoll_event ev;
        ev.data.fd = fd;
        ev.events = pc->GetCurrentEvnts();
        _channels.erase(fd);

        int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, &ev);
        if (n == -1)
        {
            ERROR_LOG("epoll_ctl error: %s", strerror(errno));
            abort();
        }
    }

    void Poll(std::vector<Channel *> *v) // 将触发事件的Channel对象返回给EventLoop
    {
        int n = epoll_wait(_epfd, _eps, default_events_num, -1);
        if (n == -1)
        {
            if (errno == EINTR)
                return;
            ERROR_LOG("epoll_wait error: %s", strerror(errno));
            abort();
        }

        for (int i = 0; i < n; i++)
        {
            auto it = _channels.find(_eps[i].data.fd);
            assert(it != _channels.end());
            v->push_back(it->second);                     // 将事件触发的Channel对象返回给上层
            it->second->SetTriggerEvents(_eps[i].events); // 给Channel添加新的就绪事件
        }
    }

private:
    int _epfd; // epoll例程的文件描述符
    epoll_event _eps[default_events_num];
    std::unordered_map<int, Channel *> _channels; // 对Channel对象做管理
};

using TimerCallBack = std::function<void()>;
using Realease = std::function<void()>;
class TimerTask // 定时任务对象
{
public:
    TimerTask(uint32_t timerid, TimerCallBack cb, uint32_t timeout) // 什么任务在什么时间后执行
        : _callback(cb), _timeout(timeout), _timerid(timerid), _cancel(false)
    {
    }

    ~TimerTask()
    {
        if (!_cancel)
            _callback();
        _rcb(); // 对象释放的时候要顺便清楚TimerWheel当中哈希表存储的内容
    }

    uint32_t timeout() { return _timeout; }
    void cancel() { _cancel = true; }
    void SetRealease(Realease cb) { _rcb = cb; }

private:
    TimerCallBack _callback; // 定时任务
    uint32_t _timeout;       // 该对象设定的定时时间
    uint32_t _timerid;       // 标识定时任务
    bool _cancel;
    Realease _rcb;
};

using TimerSmartPtr = std::shared_ptr<TimerTask>;
using TimerWeakPtr = std::weak_ptr<TimerTask>;
class TimerWheel // 时间轮
{
private:
    static int GetTimerfd()
    {
        int fd = timerfd_create(CLOCK_MONOTONIC, 0);
        if (fd == -1)
        {
            ERROR_LOG("timerfd_create error!");
            abort();
        }
        struct itimerspec itime;
        itime.it_value.tv_sec = 1;
        itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后
        itime.it_interval.tv_sec = 1;
        itime.it_interval.tv_nsec = 0; // 第一次超时后，每次超时的间隔时
        timerfd_settime(fd, 0, &itime, NULL);
        return fd;
    }

    void ReadFromTimerfd()
    {
        uint64_t times;
        int ret = read(_timerfd, &times, 8);
        if (ret < 0)
        {
            ERROR_LOG("read timerfd error!");
            abort();
        }
    }

    void OnTime() // 超时，需要读取数据和转动一次秒针
    {
        ReadFromTimerfd();
        tick();
    }

    void AddTimerToQueue(uint32_t timerid, TimerCallBack cb, uint32_t timeout) // 添加定时任务
    {
        TimerSmartPtr tsp(new TimerTask(timerid, cb, timeout));
        tsp->SetRealease(std::bind(&TimerWheel::RemoveFromHash, this, timerid));
        int pos = (_tick + timeout) % _capacity;
        _tasks[pos].push_back(tsp);

        _smartptrs[timerid] = tsp; // 存储
    }

    void TimerRefreshToQueue(uint32_t timerid) // 定时任务刷新
    {
        auto it = _smartptrs.find(timerid);
        if (it == _smartptrs.end())
            return;

        TimerSmartPtr tsp = _smartptrs[timerid].lock();
        int pos = (_tick + tsp->timeout()) % _capacity;
        _tasks[pos].push_back(tsp);

        _smartptrs[timerid] = tsp; // 既然更新了，就说明旧的不用了
    }

    void TimerDelToQueue(uint32_t timerid)
    {
        auto it = _smartptrs.find(timerid);
        if (it == _smartptrs.end())
            return; // 不存在不需要取消

        TimerSmartPtr ptr = _smartptrs[timerid].lock();
        if(ptr) ptr->cancel();
    }

public:
    TimerWheel(EventLoop *loop)
        : _loop(loop), _timerfd(GetTimerfd()),
          _capacity(60), _tasks(_capacity) // 时间轮的默认大小为60(对应钟表)
          ,
          _tick(0), // 默认从0位置开始
          _channel_ptr(new Channel(_loop, _timerfd))
    {
        _channel_ptr->SetReadCallBack(std::bind(&TimerWheel::OnTime, this)); // 设置定时器超时任务
        _channel_ptr->SetCurrentEvents(EPOLLIN);
    }

    void tick()
    {
        // “秒针”所指的内容就是触发的定时事件
        _tasks[_tick].clear();
        ++_tick;
        _tick %= _capacity;
    }

    /*--------线程安全化，执行定时任务的实质是把定时任务压入任务队列---------*/
    void AddTimer(uint32_t timerid, TimerCallBack cb, uint32_t timeout);

    void TimerRefresh(uint32_t timerid);

    void TimerDel(uint32_t timerid);
    /*----------不同的线程操作同一个TimerWheel时，这些任务都会被放在EventLoop线程内部执行--------*/

    bool HasTimer(uint32_t timerid)
    {
        if (_smartptrs.count(timerid))
            return true;
        return false;
    }

private:
    uint32_t _tick; // "秒钟"-滴答
    uint32_t _capacity;
    std::vector<std::vector<TimerSmartPtr>> _tasks;
    std::unordered_map<uint32_t, TimerWeakPtr> _smartptrs; // 方便快速定位每一个任务对象的智能指针。注意，要使用弱引用，否则就会延长对象的生命周期
    int _timerfd;                                          // 定时器文件描述符
    EventLoop *_loop;
    std::unique_ptr<Channel> _channel_ptr;

    void RemoveFromHash(uint32_t timerid)
    {
        _smartptrs.erase(timerid);
    }
};

using Functor = std::function<void()>; // 压入任务队列的任务对象
class EventLoop
{

public:
    EventLoop()
        : _thread_id(std::this_thread::get_id()),
          _poller_ptr(new Poller()),
          _event_fd(GetEventFd()),
          _channel_ptr(new Channel(this, _event_fd)), _tw(this)
    {
        _channel_ptr->SetReadCallBack(std::bind(&EventLoop::ReadFromEventfd, this));
        _channel_ptr->EnableRead();
    }

    void Start() // 启动EventLoop模块
    {
        while (true)
        {
            std::vector<Channel *> v;
            _poller_ptr->Poll(&v);
            for (auto &e : v)
            {
                e->CallBackBaseOnEvents();
            }
            RunAllTask(); // 事件处理完成后，执行任务队列的所有任务
        }
    }

    /*新增接口！为了迎合Connection模块的Upgrade*/
    void AssertInLoop()
    {
        if(_thread_id != std::this_thread::get_id()) abort();
    }

    void RunInLoop(const Functor &cb)
    {
        if (IsInFirstThread())
        {
            cb();
            return;
        }
        PushTaskToQueue(cb);
    }

    void UpdateEvent(Channel *pc) { _poller_ptr->UpdateEvent(pc); }
    void RemoveEvent(Channel *pc) { _poller_ptr->RemoveEvent(pc); }
    void AddTimer(uint32_t timerid, TimerCallBack cb, uint32_t timeout) { _tw.AddTimer(timerid, cb, timeout); }
    void TimerDel(uint32_t timerid) { _tw.TimerDel(timerid); }
    void TimerRefresh(uint32_t timerid) { _tw.TimerRefresh(timerid); }
    bool HasTimer(uint32_t timerid) { return _tw.HasTimer(timerid); }

private:
    bool IsInFirstThread() // 当前EventLoop是否处于原来的线程中
    {
        return _thread_id == std::this_thread::get_id();
    }

    void PushTaskToQueue(const Functor &cb)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.push_back(cb);
        }
        WriteToEventfd();
    }

    static int GetEventFd() // 获取evenfd的返回值
    {
        int ret = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (ret < 0)
        {
            ERROR_LOG("eventfd error!");
            abort();
        }
        return ret;
    }

    void ReadFromEventfd() // 从eventfd当中读取数据
    {
        int64_t val = 0;
        int ret = read(_event_fd, &val, sizeof(val)); // 读了之后就会将内容清空
        if (ret == -1)
        {
            if(errno == EINTR || errno == EAGAIN)
            ERROR_LOG("eventfd error!");
            abort();
        }
    }

    void WriteToEventfd() // 向eventfd当中写数据
    {
        int64_t val = 1;
        int ret = write(_event_fd, &val, sizeof(val)); // 读了之后就会将内容清空
        if (ret < -1)
        {
            ERROR_LOG("eventfd error!");
            abort();
        }
    }

    void RunAllTask()
    {
        std::vector<Functor> tmp;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.swap(tmp); // 直接交换，交换完成后任务队列就为空了，其他线程想执行任务就执行不了了
        }
        for (auto &e : tmp)
        {
            e();
        }
    }

    std::thread::id _thread_id; // 记录创建该EventLoop对象的线程ID
    std::vector<Functor> _tasks;
    std::mutex _mutex; // 保证压入任务队列时的线程安全
    int _event_fd;
    std::unique_ptr<Poller> _poller_ptr;
    std::unique_ptr<Channel> _channel_ptr; // 这个Channel主要负责eventfd
    TimerWheel _tw;                        // 定时器
};

class LoopThread
{
private:
    void ThreadEntry()// 线程入口函数
    {
        EventLoop loop;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _loop = &loop;
            _cond.notify_all();
        }
        loop.Start();// 启动事件监听
    }
public:
    LoopThread()
        :_loop(nullptr),_thread(&LoopThread::ThreadEntry,this)
    {}
    EventLoop *GetLoop()// 返回EventLoop指针(不能直接返回，因为有可能_loop还没有在线程当中实例化对象)
    {
        EventLoop *loop = nullptr;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _cond.wait(lock,[&](){return _loop != nullptr;});// _loop不为空才唤醒
            loop = _loop;
        }
        return _loop;
    }
private:    
    EventLoop *_loop;// 不能直接给私有对象，要保证先有线程再有对象
    std::thread _thread;
    std::mutex _mutex;
    std::condition_variable _cond;
};

class LoopThreadPool
{
public:
    LoopThreadPool(EventLoop *baseloop)
        :_baseloop(baseloop),_thread_count(0),_next_loop(0)
    {}

    void SetThreadCount(size_t size) {_thread_count = size;}
    void Create()// 启动线程池(创建线程)
    {
        if(_thread_count > 0)
        {
            _threads.resize(_thread_count);
            _loops.resize(_thread_count);
            for(int i=0;i<_thread_count;i++)
            {
                _threads[i] = new LoopThread();
                _loops[i] = _threads[i]->GetLoop();// LoopThread没创建，就会阻塞在这里
            }
        }
    }

    EventLoop *NextLoop()// 下一个EventLoop
    {
        if(_thread_count == 0) return _baseloop;
        _next_loop = (_next_loop + 1) % _thread_count;
        return _loops[_next_loop];
    }
private:
    int _thread_count;// 线程数量
    int _next_loop;// 下标->下一个EventLoop
    EventLoop *_baseloop;// 主线程的EventLoop
    std::vector<LoopThread *> _threads;
    std::vector<EventLoop *> _loops;
};


void Channel::CancelReadMonitor() // 取消读事件监控
{
    _events &= (~EPOLLIN);
    _pl->UpdateEvent(this);
}
void Channel::CancelWriteMonitor()// 取消写事件监控
{
    _events &= (~EPOLLOUT);
    _pl->UpdateEvent(this);
} 
void Channel::CancelAbnormalMonitor()// 取消异常事件监控
{
    _events &= (~EPOLLHUP);
    _pl->UpdateEvent(this);
} 
void Channel::CancelBrokenMonitor()// 取消连接断开事件监控
{
    _events &= (~EPOLLRDHUP);
    _pl->UpdateEvent(this);
} 
void Channel::CancelAllMonitor()// 取消所有事件监控
{
    _events = 0;
    _pl->UpdateEvent(this);
} 

void Channel::SetCurrentEvents(int32_t events)
{
    _events = events;
    _pl->UpdateEvent(this);
}

void Channel::Update()
{
    _pl->UpdateEvent(this);
}

void Channel::Remove()
{
    _pl->RemoveEvent(this);
}


void TimerWheel::AddTimer(uint32_t timerid, TimerCallBack cb, uint32_t timeout) // 添加定时任务
{
    _loop->RunInLoop(std::bind(&TimerWheel::AddTimerToQueue, this, timerid, cb, timeout));
}

void TimerWheel::TimerRefresh(uint32_t timerid) // 定时任务刷新
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshToQueue, this, timerid));
}

void TimerWheel::TimerDel(uint32_t timerid)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerDelToQueue, this, timerid));
}

class Any
{
public:
    Any() : _content(nullptr) {}
    ~Any() { delete _content; }
    template <class T>
    T *get()
    {
        if (typeid(T) != _content->type())
            return nullptr;
        return &((placeholder<T> *)_content)->_val;
    }
    template <class T>
    Any(const Any &any) : _content(new placeholder<T>(any._content ? any._content->clone() : nullptr)) {}

    Any &operator=(const Any &any)
    {
        Any(any).swap(*this);
        return *this;
    }

    void swap(Any &any)
    {
        std::swap(_content, any._content);
    }
    template <class T>
    Any &operator=(const T &val)
    {
        Any(val).swap(*this);
        return *this;
    }

    template <class T>
    Any(const T &val) : _content(new placeholder<T>(val)) {}

private:
    class holder
    {
    public:
        virtual ~holder() {}
        virtual const std::type_info &type() = 0;
        virtual holder *clone() = 0;
    };

    template <class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T &val) : _val(val) {}
        virtual const std::type_info &type() { return typeid(T); }
        virtual holder *clone() { return new placeholder(_val); }
        T _val;
    };
    holder *_content; // 父类的指针，形成多态
};

class Connection;
enum ConnStatus
{
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    DISCONNECTING
};
using ConnectionPtr = std::shared_ptr<Connection>; // 外部要操作Connection对象必须通过智能指针，因为只有智能指针才知道对象是否存活(使用原生指针无法知道操作的对象是否存活)
class Connection : public std::enable_shared_from_this<Connection>
{
    using ConnectedCallBack = std::function<void(const ConnectionPtr &)>;
    using ClosedCallBack = std::function<void(const ConnectionPtr &)>;
    using AnyCallBack = std::function<void(const ConnectionPtr &)>;
    using MessageCallBack = std::function<void(const ConnectionPtr &, Buffer *)>; // 事件触发后的回调函数

private:
    void SendInLoop(Buffer &buf)// 发送数据
    {
        if(_status == DISCONNECTED) return;

        _outbuffer.Write(buf);
        if(_channel.IsWrite() == false)// 发送数据的本质就是启动写事件监听，写事件触发后会调用Channel的写事件回调函数进行数据发送
        {
            _channel.EnableWrite();
        }
    }

    void ShutDownInLoop()
    {
        _status = DISCONNECTING;

        if(_inbuffer.ReadableSpaceSize() > 0)
        {
            if(_message_callback) _message_callback(shared_from_this(),&_inbuffer);
        }
        if(_outbuffer.ReadableSpaceSize() > 0)
        {
            if(_channel.IsWrite() == false)
            {
                _channel.EnableWrite();
            }
        }
        if(_outbuffer.ReadableSpaceSize() == 0)
        {
            ReleaseConn();
        }
    }
    void StartInactivBrokenInLoop(int sec)// 启动定时任务
    {
        _inactive_broken = true;
        if(_loop->HasTimer(_connid)) _loop->TimerRefresh(_connid);
        else _loop->AddTimer(_connid,std::bind(&Connection::ReleaseConn,this),sec);
    }

    void CancelInactiveBrokenInLoop()// 取消定时任务
    {
        _inactive_broken = false;
        if(_loop->HasTimer(_connid)) _loop->TimerDel(_connid);
    }

    void UpgradeInLoop(const Any &context, const ConnectedCallBack &conn,
                 const ClosedCallBack &close, const AnyCallBack &any,
                 const MessageCallBack &message)
    {
        _context = context;
        _connected_callback = conn;
        _closed_callback = close;
        _any_callback = any;
        _message_callback = message;
    }

    void EstablishedInLoop()
    {
        if(_status != CONNECTING) abort();
        _status = CONNECTED;

        //_channel.SetCurrentEvents(EPOLLIN | EPOLLRDHUP | EPOLLHUP);
        _channel.EnableRead();
        if(_connected_callback) _connected_callback(shared_from_this());
    }

    void ReleaseConnInLoop() // 直接释放连接
    {
        //if(_status != DISCONNECTING) abort();
        _status = DISCONNECTED;
        
        _channel.Remove();
        _socket.Close();// 关闭文件描述符
        DEBUG_LOG("文件描述符绝对删了");
        if(_loop->HasTimer(_connid)) CancelInactiveBrokenInLoop();// 取消定时销毁任务
        if(_closed_callback) _closed_callback(shared_from_this());
        if(_server_closed_callback) _server_closed_callback(shared_from_this());
    }

    // 提供5个Channel事件回调
    void ReadHandle()
    {
        char buffer[65536] = {0};
        ssize_t ret = _socket.Recv(buffer, sizeof(buffer) - 1, MSG_DONTWAIT);
        if (ret == -1) // 读取出错了、对方关闭连接了，我们也关闭连接
        {
            ShutDownInLoop();
            return;
        }     
        _inbuffer.Write(buffer, ret); // 写入缓冲区

        if (_inbuffer.ReadableSpaceSize() > 0)
        {
            if(_message_callback) _message_callback(shared_from_this(), &_inbuffer);
        }
    }
    void WriteHandle()
    {
        ssize_t ret = _socket.Send(_outbuffer.ReaderPosition(), _outbuffer.ReadableSpaceSize(), MSG_DONTWAIT);
        if (ret == -1)
        {
            if (_inbuffer.ReadableSpaceSize() > 0)
            {
                if(_message_callback) _message_callback(shared_from_this(), &_inbuffer);
            }
            ReleaseConn();
            return;
        }
        _outbuffer.ReaderOffset(ret); // 发送结束，向后偏移指针
        if (_outbuffer.ReadableSpaceSize() == 0) // 如果发送缓冲区没有数据可发送了
        {
            _channel.CancelWriteMonitor(); // channel也不必监听写事件了
            if (_status == DISCONNECTING)  // 如果当前的状态正是需要关闭，那么就直接关闭连接(上面已经把数据发送完了)
            {
                ReleaseConn();
                return;
            }
        }
    }
    void ErrorHandle()
    {
        BrokenHandle();
    }
    void BrokenHandle()
    {
        DEBUG_LOG("触发了断开事件");
        if (_inbuffer.ReadableSpaceSize() > 0)
        {
            if(_message_callback) _message_callback(shared_from_this(), &_inbuffer);
        }
        ReleaseConn();
    }
    void AnyHandle()
    {
        if(_inactive_broken)
        {
            _loop->TimerRefresh(_connid);
        }
        if(_any_callback) _any_callback(shared_from_this());
    }

public:
    Connection(EventLoop *loop, uint64_t connid, int sockfd)
        : _loop(loop), _connid(connid), _sockfd(sockfd), _channel(_loop, _sockfd),
        _status(CONNECTING),_inactive_broken(false),_socket(_sockfd)
    {
        _channel.SetReadCallBack(std::bind(&Connection::ReadHandle,this));
        _channel.SetWriteCallBack(std::bind(&Connection::WriteHandle,this));
        _channel.SetBrokenCallBack(std::bind(&Connection::BrokenHandle,this));
        _channel.SetAbnormalCallBack(std::bind(&Connection::ErrorHandle,this));
        _channel.SetAnyCallBack(std::bind(&Connection::AnyHandle,this));
    }

    ~Connection() {DEBUG_LOG("~Connection()");}
    void Send(void *data, size_t len) // 发送数据
    {
        Buffer tmp;
        tmp.Write(data,len);
        _loop->RunInLoop(std::bind(&Connection::SendInLoop,this,tmp));
    }

    void ShutDown() // 关闭连接
    {
        _loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
    }

    void ReleaseConn() // 直接释放连接
    {
        _loop->RunInLoop(std::bind(&Connection::ReleaseConnInLoop,this));
    }

    void StartInactivBroken(int sec) // 启动非活跃连接定时销毁
    {
        _loop->RunInLoop(std::bind(&Connection::StartInactivBrokenInLoop,this,sec));
    }

    void CancelInactiveBroken() // 取消非活跃连接定时销毁
    {
        _loop->RunInLoop(std::bind(&Connection::CancelInactiveBrokenInLoop,this));
    }

    void Upgrade(const Any &context, const ConnectedCallBack &conn,
                 const ClosedCallBack &close, const AnyCallBack &any,
                 const MessageCallBack &message) // 协议升级 --TODO
    {
        _loop->AssertInLoop();
        // 必须在EventLoop线程中执行！不能加入到任务队列！外部线程执行协议切换，该请求被加入到任务队列，会有一个延迟！在这个延迟的过程中又触发了事件，还是以原协议进行的操作！
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,conn,close,any,message));
    }

    void Established() // 连接创建之初要进行设置，例如事件的设置、启动监听等等
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
    }

    uint64_t GetConnid() { return _connid; }

    int GetSockfd() { return _sockfd; }

    bool Connected() { return _status == CONNECTED; } // 判断当前连接是否已经成功完全建立

    Any *GetContext() // 返回协议上下文
    {
        return &_context;
    }

    void SetContext(const Any &context) // 设置协议上下文
    {
        _context = context;
    }

    void SetConnectedCallBack(const ConnectedCallBack &cb) { _connected_callback = cb; }

    void SetClosedCallBack(const ClosedCallBack &cb) { _closed_callback = cb; }

    void SetAnyCallBack(const AnyCallBack &cb) { _any_callback = cb; }

    void SetMessageCallBack(const MessageCallBack &cb) { _message_callback = cb; }

    void SetServerClosedCallback(const ClosedCallBack &cb) {_server_closed_callback = cb;}

private:
    uint64_t _connid;   // 唯一标识一个Connection对象，也可以用来标识一个定时器
    int _sockfd;        // 管理的文件描述符
    Socket _socket;     // 管理的Socket对象
    EventLoop *_loop;   // 关联的EventLoop(保证操作都在一个线程内执行)
    Channel _channel;   // 管理的Channel对象(事件必须经过它)
    ConnStatus _status; // 状态，每种状态都有不同的处理方式
    Any _context;       // 协议上下文
    Buffer _inbuffer;
    Buffer _outbuffer; // 输入输出缓冲区
    bool _inactive_broken;

    ConnectedCallBack _connected_callback;
    ClosedCallBack _closed_callback;        // 这个回调并不是关闭连接！是关闭连接时，组件使用者还想干点别的事情
    ClosedCallBack _server_closed_callback; // 真正关闭连接的回调
    AnyCallBack _any_callback;
    MessageCallBack _message_callback;
};



class Acceptor
{
using AcceptorCallBack = std::function<void(int)>;
public:
    Acceptor(EventLoop *loop,int port)
        :_loop(loop),_socket(CreateServer(port)),_channel(_loop,_socket.GetFd())
    {
        _channel.SetReadCallBack(std::bind(&Acceptor::ReadHandle,this));
        //_channel.EnableRead();// 事件监控不能放在构造函数中！如果回调未设置，事件已经触发，新的套接字不会被处理！资源泄漏！
    }

    void Listen() {_channel.SetCurrentEvents(EPOLLIN);}
    void SetAcceptorCallBack(const AcceptorCallBack &cb) {_callback = cb;}
private:
    int CreateServer(int port)
    {
        _socket.Create(true);// 开启地址重用
        _socket.Bind(port);
        _socket.Listen();
        return _socket.GetFd();
    }

    void ReadHandle()// Channel的读事件回调
    {
        int connfd = _socket.Accept();
        if(connfd == -1) 
        {
            ERROR_LOG("Socket->Accept error!");
            abort();
        }
        if(_callback) _callback(connfd);
    }
private:
    Socket _socket;
    EventLoop *_loop;// 要对监听套接字进行事件监听
    Channel _channel;
    AcceptorCallBack _callback;
};



class TcpServer
{
private:    
    using ConnectedCallBack = std::function<void(const ConnectionPtr &)>;
    using ClosedCallBack = std::function<void(const ConnectionPtr &)>;
    using AnyCallBack = std::function<void(const ConnectionPtr &)>;
    using MessageCallBack = std::function<void(const ConnectionPtr &, Buffer *)>;

    void NewConnection(int fd)// 为某个套接字创建Connection对象
    {
        ++_nextid;
        ConnectionPtr conn(new Connection(_pool.NextLoop(),_nextid,fd));
        conn->SetConnectedCallBack(_connected_callback);
        conn->SetClosedCallBack(_closed_callback);
        conn->SetAnyCallBack(_any_callback);
        conn->SetMessageCallBack(_message_callback);
        conn->SetServerClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));
        if(_start_inactive_broken) conn->StartInactivBroken(_timeout);
        conn->Established();
        _conns.insert(std::make_pair(_nextid,conn));
    }

    void RemoveConnectionInLoop(const ConnectionPtr &conn)// 移除某个Conncetion对象
    {
        _conns.erase(conn->GetConnid());
    }

    void RemoveConnection(const ConnectionPtr &conn)// 移除某个Conncetion对象
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));
    }

    void AddCrontabInLoop(const Functor &func,int timeout)// 添加任意一个定时任务
    {
        ++_nextid;
        _baseloop.AddTimer(_nextid,func,timeout);
    }

public:
    void SetConnectedCallBack(const ConnectedCallBack &cb) { _connected_callback = cb; }

    void SetClosedCallBack(const ClosedCallBack &cb) { _closed_callback = cb; }

    void SetAnyCallBack(const AnyCallBack &cb) { _any_callback = cb; }

    void SetMessageCallBack(const MessageCallBack &cb) { _message_callback = cb; }

    TcpServer(int port)
        :_nextid(0),
        _port(port),
        _acceptor(&_baseloop,_port),
        _timeout(0),_start_inactive_broken(false),_pool(&_baseloop)
    {
        _acceptor.SetAcceptorCallBack(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));
        _acceptor.Listen();
    }

    ~TcpServer() {DEBUG_LOG("~TcpServer()");}

    // 设置线程数量
    void SetThreadCount(int size) {_pool.SetThreadCount(size);}

    void StartInactiveBroken(int timeout)// 启动定时销毁非活跃连接
    {
        _timeout = timeout;
        _start_inactive_broken = true;
    }

    void AddCrontab(const Functor &func,int timeout)// 添加任意一个定时任务
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::AddCrontabInLoop,this,func,timeout));
    }

    void Start()// 启动服务器
    {
        _pool.Create();
        _baseloop.Start();
    }
private:
    uint64_t _nextid;// 为Connection对象分为connid
    int _port;
    EventLoop _baseloop;// 主线程EventLoop
    Acceptor _acceptor;// 监听套接字的管理对象
    LoopThreadPool _pool;// 从线程的EventLoop的线程池
    int _timeout;// 定时时间
    bool _start_inactive_broken;// 是否启动非活跃链接定时销毁?
    std::unordered_map<uint64_t,ConnectionPtr> _conns;// 管理所有的Connection对象

    ConnectedCallBack _connected_callback;
    ClosedCallBack _closed_callback;        // 这个回调并不是关闭连接！是关闭连接时，组件使用者还想干点别的事情
    AnyCallBack _any_callback;
    MessageCallBack _message_callback;
};